\documentclass[10pt]{article}
\usepackage{xspace}
\usepackage{microtype}
\usepackage[margin=1in]{geometry}
\usepackage{ragged}
\usepackage{url}
\usepackage{paralist}
\usepackage{listings}

\lstset{language=C++,
        basicstyle=\ttfamily\scriptsize}
        
\newcommand{\p}{Piccolo\xspace}

\newenvironment{innerlist}[1][\enskip\textbullet]%
        {\begin{compactitem}[#1]}{\end{compactitem}}

\begin{document}

\date{}
\author{}
\title{Piccolo API Documentation}
\maketitle

\section{Overview}
\p is a system for writing high-performance distributed systems, usign a
simple, flexible table interface.

Developing a \p consists of defining a set of tables to work with, and a set of
kernel functions which operate on these tables, reading, writing and updating
values to achieve a certain goal.  This document walks through the development
of a simple Piccolo program that adds all of the entries from 2 tables together,
to show these elements work together.

(\p programs can be written in C++ or Python; this document focuses on the C++
side of things, but the development of a Python program is very similar.)
 
\section{Tables}
Development in \p revolves around the concept of {\em global tables}.  The API
exported by these tables is shown in figure~\ref{fig:tableapi}.

\begin{figure}[h!]
\begin{lstlisting}
class GlobalTable<class K, class V> {
  // Fetch the value associated with 'key'
  V get(K key);
  
  // Insert the given key-value pair, replacing
  // any existing value.
  put(K key, V value);
  
  // Insert the given key-value pair, applying
  // the accumulation function if necessary
  update(K key, V value);
  
  // Resize the table to at least max(size, nelements)
  resize(size);
  
  // Wipe the contents of the table
  clear();
  
  // Return an iterator over all local data for this table.
  Iterator* get_iterator();
}

// Create a new table with the given integer id.
CreateTable(table_id, num_shards, sharding_policy, accumulator);
\end{lstlisting}
\caption{\sffamily Global Table API \label{fig:tableapi}}
\end{figure}

Tables are divided into multiple {\em shards}; these shards are divided across
all available workers.  The selection of how keys are assigned to shards is
given by the sharding policy; the sharding API, and default sharding policies
are (figure~\ref{fig:shardingapi}):

\begin{figure}[h!]
\begin{lstlisting}

// A custom sharder must implement this interface - 
// it can then be passed to the CreateTable method.
template <class K>
struct Sharder {
  virtual int operator()(const K& k, int shards) = 0;
};

struct Sharding {
  // Requires the key be a string.  Returns hash(key) % num_shards
  struct String;
  
  // Requires the key be an integral type.  Returns key % num_shards.
  struct Mod;
};
\end{lstlisting}
\caption{\sffamily Sharding Functions}
\label{fig:shardingapi}
\end{figure}

To enable proper behavior in the face of concurrent update() calls, tables rely
on an accumulation operator to merge multiple values together.  In general for
correctness this operator should be associative, however this may not be
necessary depending on the application.  The interface for defining an
accumulator, and the default accumulators are (figure~\ref{fig:accumapi}):

\begin{figure}[h!]
\begin{lstlisting}  

// The required interface for accumulators.  'a' is the current value
// for a table entry, 'b' is an update.  'a' should be modified after
// execution to represent a merged value. 
template <class V>
struct Accumulator {
  virtual void Accumulate(V* a, const V& b) = 0;
};

// Default accumulation policies.
struct Accumulators {
  struct Min;
  struct Max;
  struct Sum;
  struct Replace;
};

\end{lstlisting}
\caption{\sffamily Accumulator API}
\label{fig:accumapi}
\end{figure}

\section{Kernels}

In \p, tables hold the data, and provide an API for operating on it, but how do
we actually make use of this?  {\em Kernels} are \p's mechanism for working with
tables in parallel.  As a user, you can specify kernel code in 2 ways; either by
defining a kernel class to be instantiated and run, or by using the PMap, PRun,
and PRunOne convenience methods.  This document will focus on the convenience
methods - their interface is given in~\ref{fig:methodapi}.

 \begin{figure}[h!]
\begin{lstlisting}
// Execute code once for each shard of the given 
// table, with locality for that shard.
PRunAll(table, code);

// Execute code once; with locality for an arbitrary shard.
PRunOne(table, code);

// Execute code once for each value in the given table.  The
// value will be bound to the specified name
PMap({ v : table }, code);
\end{lstlisting}
\caption{\sffamily Parallel Run Methods}
\label{fig:methodapi}
\end{figure}

\section{Triggers}
Some applications benefit greatly from the locality and partitioning features of
\p, but either require a small number of data pairs to be modified and/or
examined on each iteration, or cannot be cleanly divided into iterations. To meet
the needs on these applications, \p now has a developing trigger system.  Triggers
can be set on any table, and include sections of code that are executed whenever
a key, value pair is updated.  Triggers can allow the update to occur as-is,
intervene to prevent the update, or even modify the update value.  To avoid
unexpected behaviors, a mechanism is in place to enable and disable Piccolo
triggers.  The current trigger interface is given below in~\ref{fig:trigapi}, but
for the near future is subject to minor change and major additions.

As of late April, 2011, \p contains two types of trigger, low- and short-running
triggers.  The specific mechanics and interface for these triggers are still being
designed.

Note: The documentation for triggers is outdated and will shortly be updated.
Accumulators and Triggers have been merged while building the Oolong project on the
Piccolo base code.  One important thing that will be noted is to be careful of Trigger
side effects during checkpoint restoration.

 \begin{figure}[h!]
\begin{lstlisting}

// Create a new table with the given integer id and thread(s) to
// execute long-running triggers
CreateTable(table_id, num_shards, sharding_policy, accumulator, num_longtrigger_threads);

// Define trigger class
class MyTrigger : public Trigger<K, V> {
  public:
    // Return true to allow update to occur, or false to
    // prevent update.  value will contain the current value
    // for key, and newvalue is the new value to be set.
    // newvalue may be modified in the Fire method, in which
    // case Fire should return true.
    bool Fire(const K& key, const V& value, V& newvalue) {

      //triggers should not call update()
      sometable->enqueue_update(otherkey, othervalue);

      //short/normal triggers can set 

      return true/false;
    }
    // Return true to make the LongFire be rescheduled, or
    // false to make this the last scheduled execution.  The
    // long trigger will be occasionally fired until it returns
    // false.
    bool LongFire(const K& key) {
      return true/false;
    }
};

// Register trigger on table
// Must be performed before workers are started
TriggerID trigid = table->register_trigger(new MyTrigger);

// Disable specified trigger
// Must be performed after workers are started
m.enable_trigger(trigid,tablenumber,false);

// Enable specified trigger
m.enable_trigger(trigid,tablenumber,true);

\end{lstlisting}
\caption{\sffamily Trigger API}
\label{fig:trigapi}
\end{figure}

\section{Example Program}
With these definitions out of the way, we can start looking at how to implement
our simple program.  By convention, \p programs are written in files ending with
'.pp', this example assumes you've created such a file in
'src/examples/simple-program.pp'.  First, assuming we're going to going to link
into the example library, let's define our entry point:

\begin{lstlisting}
static TypedGlobalTable<int32_t, double> *a, *b, *c;

static int SimpleProgram(ConfigData& conf) {
\end{lstlisting} 

This declares a set of tables which we will reference later, and starts the main
program code. {\ttfamily ConfigData} is passed into us by the example driver
program, but we don't need to worry about it just yet.  Let's create our tables:

\begin{lstlisting}
a = CreateTable(0, 100, new Sharding::Mod, new Accumulators::Sum);
b = CreateTable(1, 100, new Sharding::Mod, new Accumulators::Sum);
c = CreateTable(2, 100, new Sharding::Mod, new Accumulators::Sum);
\end{lstlisting}

These tables all have 100 shards, are sharded by taking the modulo of the key,
and will respond to update calls by adding the current and new values together.

\begin{lstlisting}
if (!StartWorker(conf)) {
    Master m(conf);
}   
\end{lstlisting}

We now check if we are running as the master node of the system.  If
StartWorker() returns false, then we are the master; otherwise StartWorker()
will take control of execution (the running machine becomes a worker node).

Now we are exclusively running on the master.  Let's assign some random values
to our 'a' and 'b' tables:
\begin{lstlisting}
    PRunAll(a, {
      a->resize(100000);
      b->resize(100000);
      const int num_shards = points->num_shards();
      for (int64_t i = current_shard(); i < 100000; i += num_shards) {
        a->update(i, random());
        b->update(i, random()); 
      }      
   });
\end{lstlisting}

As this code is run from PRunAll, it will be executed on all 100 shards of 'a'
in parallel.  In this case, we're also updating the 'b' table for convenience
(since update's are fast, this is generally okay).  For each shard, we are
updating fraction of the total points, setting them to a random value.  (The
initial value for a table entry is 0).

Now we can try adding the entries together, and putting them into C:
\begin{lstlisting}
    PMap({v : a}, { c->update(k, v); });
    PMap({v : b}, { c->update(k, v); });
\end{lstlisting}

Here we just mapped over 'a' and 'b' separately, and invoked update on 'c'. 
Because of the accumulation operator specified there, this results in 'c'
containing the summed value of the inputs.

We can now close out our main function, and register ourselves with the example
runner:

\begin{lstlisting}
  return 0;
}
REGISTER_RUNNER(SimpleProgram);
\end{lstlisting}

Adding an entry to src/examples/CMakeLists.txt will add our program to the build
system:
\begin{lstlisting}
\add_library(example 
             simple-program.pp.cc
             ...
\end{lstlisting}

(The build system will generate a .cc file by preprocessing the various PMap and
PRun calls).

We can now 'invoke' make, and you should be able to execute your program via:

{\ttfamily  bin/release/examples/example-dsm --runner=SimpleProgram --workers=40}


\section{Conclusion}
These basic features can be used to build much more complicated and interesting
programs.  For a simple, yet slightly more interesting demonstration, check out
k-means.pp, a parallel k-means kernel.
\end{document}
